Java Sample Code

您所在的位置:网站首页 flink liststate Java Sample Code

Java Sample Code

2023-04-09 09:35| 来源: 网络整理| 查看: 265

Snapshot data

The snapshot data is used to store number of data pieces recorded by operators during creation of snapshots.

import java.io.Seriablizale; //The class is a part of the snapshot and is used to store user-defined statuses. public class UDFState implements Serializable { private long count; //Initialize user-defined statuses. public UDFState() { count = 0L; } //Configure self-defined statuses. public void setState(long count) { this.count = count; } //Obtain user-defined statuses. public long geState() { return this.count; } } Data source with checkpoints

Code of the source operator. The code can be used to send 10000 pieces after each pause of one second. When a snapshot is created, number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.

import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; import java.util.Random; //The class is the source operator with checkpoint. public class SEventSourceWithChk extends RichSourceFunction implements ListCheckpointed { private Long count = 0L; private boolean isRunning = true; private String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"; //The main logic of the operator is to inject 10000 tuples to the StreamGraph. public void run(SourceContext ctx) throws Exception { Random random = new Random(); while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet, 1)) count++; } Thread.sleep(1000); } } //Call this when the task is canceled. public void cancel() { isRunning = false; } //Customize a snapshot. public List snapshotState(long l, long ll) throws Exception { UDFState udfState = new UDFState(); List listState = new ArrayList(); udfState.setState(count); listState.add(udfState); return listState; } //Restore data from customized snapshots. public void restoreState(List list) throws Exception { UDFState udfState = list.get(0); count = udfState.getState(); } } Definition of window with checkpoint.

This code is about the window operator and is used to calculate number or tuples in the window.

import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; //The class is the window operator with checkpoint. public class WindowStatisticWithChk implements WindowFunction, ListCheckpointed { private Long total = 0L; //The implementation logic of the window operator, which is used to calculate the number of tuples in a window. void apply(Tuple key, TimeWindow window, Iterable input, Collector out) throws Exception { long count = 0L; for (Tuple4 event : input) { count++; } total += count; out.collect(count); } //Customize snapshot. public List snapshotState(Long l, Long ll) { List listState = new ArrayList(); UDFState udfState = new UDFState(); udfState.setState(total); listState.add(udfState); return listState; } //Restore data from customized snapshots. public void restoreState(List list) throws Exception { UDFState udfState = list.get(0); total = udfState.getState(); } } Application code

The code is about the definition of StreamGraph and is used to implement services. The proctime is used as the timestamp for triggering the window.

import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkProcessingTimeAPIChkMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set configurations and enable checkpoint. env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink/checkpoint/")); env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig.setCheckpointInterval(6000); //Application logic. env.addSource(new SEventSourceWithChk()) .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk()) .print() env.execute(); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3